use super::{
    ArcGroup, Attr, Group, MpscReceiver, PoolCache, RawTask, Signo, Task, TaskPool, TaskQueue,
    TaskRef, Worker, WorkerSender,
};
use crate::event::{Event, SchedImpl, Scheduler, Timer, POLLIN};
use crate::{utils, CacheLineAligned, Result};
use core::future::Future;
use core::mem::{ManuallyDrop, MaybeUninit};
use core::ptr::{self, NonNull};
use core::time::Duration;
use hioff::container_of_mut;
use hipool::{Arc, Boxed, MemPool};
use hipthread::{self, JoinHandle, LocalKey, Semaphore};

pub(crate) type BoxedWorker = Boxed<'static, ActiveWorker, MemPool>;
pub(crate) type WorkerSched = SchedImpl<'static, MemPool>;
pub(crate) type BoxedWorkerSched = Boxed<'static, WorkerSched, MemPool>;

#[allow(dead_code)]
#[repr(C)]
pub(crate) struct ActiveWorker {
    task_event: CacheLineAligned<Event>,
    queue: Arc<'static, [TaskQueue], MemPool>,
    pool_cache: PoolCache,
    sched: BoxedWorkerSched,
    current_task: Option<NonNull<RawTask>>,
    sched_cnt: u64,
    sender: WorkerSender,
    mpsc_fd_event: Event,
    mpsc_recv: MpscReceiver,
    group: ArcGroup,
    pool: &'static MemPool,
    exit_cnt: u64,
    id: u16,
    // 放在最后，确保析构正常.
}

impl ActiveWorker {
    pub(crate) fn active(
        group: ArcGroup,
        id: u16,
        max_cache: usize,
        mpsc_recv: MpscReceiver,
        queue: Arc<'static, [TaskQueue], MemPool>,
    ) -> Result<JoinHandle<()>> {
        let sem = Arc::new(Semaphore::new(0)?)?;
        let sem_worker = sem.clone();

        let handle = hipthread::spawn(move || {
            let sem_guard = utils::Guard::new(move || sem_worker.post());
            let Ok(pool) = MemPool::new_boxed(0) else {
                return;
            };
            let (pool, layout, alloc) = pool.leak();
            let _guard = unsafe { Boxed::from_with(pool.into(), layout, alloc) };
            let Ok(mut worker) = Self::new_in(pool, group, id, max_cache, mpsc_recv, queue) else {
                return;
            };
            sem_guard.commit();
            worker.run();
        })?;

        sem.wait();
        Ok(handle)
    }

    pub(crate) fn new_in(
        pool: &'static MemPool,
        group: ArcGroup,
        id: u16,
        max_cache: usize,
        mpsc_recv: MpscReceiver,
        queue: Arc<'static, [TaskQueue], MemPool>,
    ) -> Result<BoxedWorker> {
        let mut worker = Boxed::new_in(
            pool,
            Self {
                task_event: CacheLineAligned::new(Event::new(Self::task_handle)),
                queue,
                sender: WorkerSender::new(id),
                mpsc_fd_event: Event::new(Self::mpsc_fd_handle),
                mpsc_recv,
                pool_cache: PoolCache::new(max_cache),
                sched: SchedImpl::new_in(pool)?,
                current_task: None,
                sched_cnt: 0,
                pool,
                exit_cnt: 0,
                group,
                id,
            },
        )?;
        worker.init_env()?;
        Ok(worker)
    }

    pub(crate) fn run(&mut self) {
        let old = Self::current();
        Self::set_current(Some(self));
        self.sched.run();
        Self::set_current(old);
    }

    pub(crate) fn in_group(&self, group: &Group) -> bool {
        core::ptr::eq(group, &*self.group)
    }

    fn init_env(&mut self) -> Result<()> {
        let q_fd = self.queue[self.id as usize].fd();
        unsafe {
            self.sched
                .add_fd_event(&self.task_event.0, POLLIN, q_fd, 0)?
        };
        unsafe {
            self.sched.add_fd_event(
                &self.mpsc_fd_event,
                POLLIN,
                self.mpsc_recv.read_fd_event().0,
                0,
            )
        }
        .map_err(|e| {
            let _ = unsafe { self.sched.del_fd_event(q_fd) };
            e
        })?;
        let data = self as *const _ as *const ();
        self.sched.set_private_data(data);
        Ok(())
    }
}

static CURRENT_WORKER: LocalKey<ActiveWorker> = LocalKey::new();

impl ActiveWorker {
    pub(crate) unsafe fn from_sched(sched: &dyn Scheduler) -> &'static mut Self {
        &mut *sched.private_data().cast_mut().cast::<Self>()
    }

    pub(crate) fn current() -> Option<&'static mut Self> {
        let addr = CURRENT_WORKER.get();
        if !addr.is_null() {
            Some(unsafe { &mut *(addr.cast_mut()) })
        } else {
            None
        }
    }

    fn set_current(worker: Option<&mut Self>) {
        if let Some(worker) = worker {
            CURRENT_WORKER.set(worker);
        } else {
            CURRENT_WORKER.set(ptr::null());
        }
    }

    fn get_task_pool(&mut self) -> Result<TaskPool> {
        self.pool_cache.pop()
    }
}

impl ActiveWorker {
    fn task_from<T: Future>(&mut self, future: T, attr: &Attr) -> Result<TaskRef> {
        let pool = self.get_task_pool()?;
        Task::new_in(pool, future, attr, self.group.clone())
    }

    fn task_handle(e: &Event, _events: u32, _flags: u16, _sched: &mut dyn Scheduler) {
        let this = unsafe { container_of_mut!(e, Self, task_event.0) };
        let queue = this.queue.as_ref();
        let mut tasks = queue[this.id as usize].pop();
        while !tasks.is_null() {
            let task = unsafe { TaskRef::from(tasks) };
            tasks = task.next;
            this.sched_local(task);
            this.sched_cnt += 1;
        }
    }

    fn mpsc_fd_handle(e: &Event, _events: u32, _flags: u16, _sched: &mut dyn Scheduler) {
        const RECV_BUF_MAX: usize = 64;
        let this = unsafe { container_of_mut!(e, Self, mpsc_fd_event) };
        let mut signals = MaybeUninit::<[MaybeUninit<Signo>; RECV_BUF_MAX]>::uninit();
        let signals = unsafe { signals.assume_init_mut() };
        if let Some(cnt) = this.mpsc_recv.try_recv_slice(signals) {
            for sig in &mut signals[..cnt] {
                match unsafe { sig.assume_init_read() } {
                    Signo::SIG_STOP => {
                        this.sched.stop();
                    }
                    Signo::SIG_NAME(mut name) => {
                        if name.is_empty() {
                            name = "hirun";
                        }
                        let mut tname = [0_u8; 16];
                        hifmt::bprint!(&mut tname, "{:rs}_{:u}", name, this.id);
                        hipthread::thrd_setname(core::str::from_utf8(&tname).unwrap_or(name));
                    }
                }
            }
        }
    }
}

impl Worker for ActiveWorker {
    fn spawn<T: Future>(&mut self, future: T, attr: &Attr) -> Result<TaskRef> {
        let mut task = self.task_from(future, attr)?;
        if attr.hash == 0 {
            if let Some(task) = self.sender.send(&self.queue, task.clone()) {
                self.sched_local(task);
            }
        } else {
            let id = (attr.hash % self.queue.len()) as u16;
            task.status.set_local(id);
            if id != self.id {
                self.queue[id as usize].push(task.clone());
            } else {
                self.sched_local(task.clone());
            }
        }
        Ok(task)
    }

    fn spawn_local<T: Future>(&mut self, future: T, attr: &Attr) -> Result<TaskRef> {
        let mut task = self.task_from(future, attr)?;
        task.status.set_local(self.id);
        self.sched_local(task.clone());
        Ok(task)
    }

    fn wake(&mut self, task: TaskRef, local: Option<u16>) {
        // 优先处理最常见场景. rust未提供分支预测功能
        #[allow(clippy::unnecessary_unwrap)]
        if local.is_none() || local.unwrap() == self.id {
            self.sched_local(task);
        } else {
            self.queue[local.unwrap() as usize].push(task);
        }
    }

    fn group_id(&self) -> u8 {
        self.group.id()
    }

    fn worker_id(&self) -> u16 {
        self.id
    }

    fn set_current_task(&mut self, current: Option<NonNull<RawTask>>) {
        self.current_task = current;
        if self.current_task.is_none() {
            self.exit_cnt += 1;
        }
    }

    fn sched_local(&mut self, task: TaskRef) {
        let task = ManuallyDrop::new(task);
        let pri = task.status.priority() as i32;
        unsafe { self.sched.add_event(&task.event, pri) };
        self.sched_cnt += 1;
    }

    fn cache_task_pool(&mut self, pool: TaskPool) {
        self.pool_cache.push(pool);
    }

    fn pool(&self) -> &'static MemPool {
        self.pool
    }
}

impl Scheduler for ActiveWorker {
    unsafe fn add_fd_event(&mut self, e: &Event, events: u32, fd: i32, flags: u16) -> Result<()> {
        self.sched.add_fd_event(e, events, fd, flags)
    }

    unsafe fn mod_fd_event(&mut self, e: &Event, events: u32, fd: i32, flags: u16) -> Result<()> {
        self.sched.mod_fd_event(e, events, fd, flags)
    }

    unsafe fn del_fd_event(&mut self, fd: i32) -> Result<()> {
        self.sched.del_fd_event(fd)
    }
    unsafe fn add_event(&mut self, e: &Event, priority: i32) {
        self.sched.add_event(e, priority)
    }

    unsafe fn del_event(&mut self, e: &Event) {
        self.sched.del_event(e)
    }

    unsafe fn set_timer(&mut self, t: &Timer, msecs: u32) {
        self.sched.set_timer(t, msecs)
    }

    unsafe fn del_timer(&mut self, t: &Timer) {
        self.sched.del_timer(t)
    }

    fn private_data(&self) -> *const () {
        self.sched.private_data()
    }

    fn set_private_data(&mut self, private: *const ()) {
        self.sched.set_private_data(private)
    }

    fn stop(&self) {
        self.sched.stop()
    }

    fn now(&self) -> Duration {
        self.sched.now()
    }
}
